Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13231

`TransactionalMessageCopier.start_node` should wait until the process if fully started

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.1.0, 2.8.1, 3.0.1
    • None
    • None

    Description

      There is a subtile race condition in the code which bounces the transaction message copier. As you can see in the log snippet above, it is possible that the copier get bounced before it even starts. Note that the process is stated by a separate thread. In this case, the method which stops the current process miss it because the PID is not there yet. However, the stop hangs because the thread does not stop as expected.

      It seems that we should wait until the process is fully started in `restart` or `start_node` to avoid this issue.

      [INFO  - 2021-08-24 07:48:25,882 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 5250, remaining 28083
      [INFO  - 2021-08-24 07:48:26,121 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 6000, remaining 27333
      [INFO  - 2021-08-24 07:48:26,379 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 6750, remaining 26583
      [INFO  - 2021-08-24 07:48:26,536 - transactions_test - bounce_copiers - lineno:144]: copier-0 - progress: 20.25020250202502
      [DEBUG - 2021-08-24 07:48:26,536 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: jps | grep -i TransactionalMessageCopier | awk '{print $1}'
      [DEBUG - 2021-08-24 07:48:26,692 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: kill -9 1567
      [DEBUG - 2021-08-24 07:48:26,733 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: jps | grep -i TransactionalMessageCopier | awk '{print $1}'
      [INFO  - 2021-08-24 07:48:27,021 - background_thread - start_node - lineno:57]: Running TransactionalMessageCopier-0-139963594423096 node 1 on worker22
      [DEBUG - 2021-08-24 07:48:27,021 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: mkdir -p /mnt/transactional_message_copier
      [DEBUG - 2021-08-24 07:48:27,068 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: java -version
      [INFO  - 2021-08-24 07:48:27,163 - kafka - bootstrap_servers - lineno:2606]: Bootstrap client port is: 9092
      [DEBUG - 2021-08-24 07:48:27,163 - transactional_message_copier - _worker - lineno:85]: TransactionalMessageCopier 1 command: export LOG_DIR=/mnt/transactional_message_copier/logs; export KAFKA_OPTS=; export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/mnt/transactional_message_copier/tools-log4j.properties"; /opt/kafka-dev/bin/kafka-run-class.sh org.apache.kafka.tools.TransactionalMessageCopier --broker-list worker13:9092,worker9:9092,worker20:9092 --transactional-id copier-0 --consumer-group transactions-test-consumer-group --input-topic input-topic --output-topic output-topic --input-partition 0 --transaction-size 750 --transaction-timeout 40000 --enable-random-aborts 2>> /mnt/transactional_message_copier/transactional_message_copier.stderr | tee -a /mnt/transactional_message_copier/transactional_message_copier.stdout &
      [DEBUG - 2021-08-24 07:48:27,163 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: export LOG_DIR=/mnt/transactional_message_copier/logs; export KAFKA_OPTS=; export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/mnt/transactional_message_copier/tools-log4j.properties"; /opt/kafka-dev/bin/kafka-run-class.sh org.apache.kafka.tools.TransactionalMessageCopier --broker-list worker13:9092,worker9:9092,worker20:9092 --transactional-id copier-0 --consumer-group transactions-test-consumer-group --input-topic input-topic --output-topic output-topic --input-partition 0 --transaction-size 750 --transaction-timeout 40000 --enable-random-aborts 2>> /mnt/transactional_message_copier/transactional_message_copier.stderr | tee -a /mnt/transactional_message_copier/transactional_message_copier.stdout &
      [INFO  - 2021-08-24 07:49:08,280 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 0, remaining 26583
      [INFO  - 2021-08-24 07:49:08,763 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 750, remaining 25833
      [INFO  - 2021-08-24 07:49:09,034 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 1500, remaining 25083
      [INFO  - 2021-08-24 07:49:09,279 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 1500, remaining 25083
      [INFO  - 2021-08-24 07:49:09,553 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 2250, remaining 24333
      [INFO  - 2021-08-24 07:49:09,793 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 2250, remaining 24333
      [INFO  - 2021-08-24 07:49:10,059 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 3000, remaining 23583
      [INFO  - 2021-08-24 07:49:10,312 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 3750, remaining 22833
      [INFO  - 2021-08-24 07:49:10,661 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 4500, remaining 22083
      [INFO  - 2021-08-24 07:49:11,056 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 5250, remaining 21333
      [INFO  - 2021-08-24 07:49:11,402 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 5250, remaining 21333
      [INFO  - 2021-08-24 07:49:11,698 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 6000, remaining 20583
      [INFO  - 2021-08-24 07:49:11,876 - transactions_test - bounce_copiers - lineno:144]: copier-0 - progress: 22.570815934996048
      [DEBUG - 2021-08-24 07:49:11,876 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: jps | grep -i TransactionalMessageCopier | awk '{print $1}'
      [INFO  - 2021-08-24 07:49:11,882 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 6000, remaining 20583
      [DEBUG - 2021-08-24 07:49:12,089 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: kill -9 2195
      [DEBUG - 2021-08-24 07:49:12,129 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: jps | grep -i TransactionalMessageCopier | awk '{print $1}'
      [INFO  - 2021-08-24 07:49:12,425 - background_thread - start_node - lineno:57]: Running TransactionalMessageCopier-0-139963594423096 node 1 on worker22
      [DEBUG - 2021-08-24 07:49:12,426 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: mkdir -p /mnt/transactional_message_copier
      [DEBUG - 2021-08-24 07:49:12,472 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: java -version
      [INFO  - 2021-08-24 07:49:12,526 - transactions_test - bounce_copiers - lineno:144]: copier-0 - progress: 22.570815934996048
      [DEBUG - 2021-08-24 07:49:12,526 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: jps | grep -i TransactionalMessageCopier | awk '{print $1}'
      [INFO  - 2021-08-24 07:49:12,567 - kafka - bootstrap_servers - lineno:2606]: Bootstrap client port is: 9092
      [DEBUG - 2021-08-24 07:49:12,567 - transactional_message_copier - _worker - lineno:85]: TransactionalMessageCopier 1 command: export LOG_DIR=/mnt/transactional_message_copier/logs; export KAFKA_OPTS=; export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/mnt/transactional_message_copier/tools-log4j.properties"; /opt/kafka-dev/bin/kafka-run-class.sh org.apache.kafka.tools.TransactionalMessageCopier --broker-list worker13:9092,worker9:9092,worker20:9092 --transactional-id copier-0 --consumer-group transactions-test-consumer-group --input-topic input-topic --output-topic output-topic --input-partition 0 --transaction-size 750 --transaction-timeout 40000 --enable-random-aborts 2>> /mnt/transactional_message_copier/transactional_message_copier.stderr | tee -a /mnt/transactional_message_copier/transactional_message_copier.stdout &
      [DEBUG - 2021-08-24 07:49:12,567 - remoteaccount - _log - lineno:160]: ubuntu@worker22: Running ssh command: export LOG_DIR=/mnt/transactional_message_copier/logs; export KAFKA_OPTS=; export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/mnt/transactional_message_copier/tools-log4j.properties"; /opt/kafka-dev/bin/kafka-run-class.sh org.apache.kafka.tools.TransactionalMessageCopier --broker-list worker13:9092,worker9:9092,worker20:9092 --transactional-id copier-0 --consumer-group transactions-test-consumer-group --input-topic input-topic --output-topic output-topic --input-partition 0 --transaction-size 750 --transaction-timeout 40000 --enable-random-aborts 2>> /mnt/transactional_message_copier/transactional_message_copier.stderr | tee -a /mnt/transactional_message_copier/transactional_message_copier.stdout &
      [INFO  - 2021-08-24 07:49:58,379 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 0, remaining 20583
      [INFO  - 2021-08-24 07:49:58,842 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 0, remaining 20583
      [INFO  - 2021-08-24 07:49:59,112 - transactional_message_copier - _worker - lineno:95]: copier-0: consumed 0, remaining 20583
      

      Attachments

        Issue Links

          Activity

            People

              dajac David Jacot
              dajac David Jacot
              Jason Gustafson Jason Gustafson
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: